fix(lifecycle): drain StreamManager goroutines in tests#1227
Conversation
|
Claude finished @jcfs's task in 6m 43s —— View job Code ReviewTodo:
Overall AssessmentSolid, well-reasoned fix for the CI goleak flake. The two-pronged approach — closing Findings
Minor observations (not inline)
|
📝 WalkthroughWalkthroughThis PR hardens the agent runtime's shutdown lifecycle by adding drain barriers at the ChangesGoroutine Lifecycle Hardening and Drain Barriers
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
apps/backend/internal/agent/runtime/lifecycle/streams.go (1)
432-449: 💤 Low valueConsider consolidating the duplicate select branches.
The shutdown select logic duplicates the
ws.Done()andwaitChcases. SincestopChcan be selected on even when nil (a receive on nil channel blocks forever), you could simplify to a single select block:select { case <-ws.Done(): case <-sm.stopCh: shutdown() case <-sm.waitCh: shutdown() }A receive on a nil channel never proceeds, so when
stopChis nil, onlyws.Done()andwaitChare effective. This removes the conditional branching.That said, the current explicit nil-check approach is clearer about intent and avoids relying on nil-channel semantics, so this is purely stylistic.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/backend/internal/agent/runtime/lifecycle/streams.go` around lines 432 - 449, The duplicate select branches can be consolidated: remove the if sm.stopCh == nil conditional and replace both branches with one select that listens for <-ws.Done(), <-sm.stopCh, and <-sm.waitCh, calling the existing shutdown() function in the latter two cases; relying on Go's nil-channel semantics (receive on nil blocks) will make the <-sm.stopCh case inert when sm.stopCh is nil while preserving behavior for ws.Done() and sm.waitCh.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@apps/backend/internal/agent/runtime/lifecycle/streams.go`:
- Around line 432-449: The duplicate select branches can be consolidated: remove
the if sm.stopCh == nil conditional and replace both branches with one select
that listens for <-ws.Done(), <-sm.stopCh, and <-sm.waitCh, calling the existing
shutdown() function in the latter two cases; relying on Go's nil-channel
semantics (receive on nil blocks) will make the <-sm.stopCh case inert when
sm.stopCh is nil while preserving behavior for ws.Done() and sm.waitCh.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 82a94bf9-322b-4b5b-80e6-124e0f5ada93
📒 Files selected for processing (7)
apps/backend/Makefileapps/backend/internal/agent/runtime/agentctl/agent.goapps/backend/internal/agent/runtime/agentctl/client.goapps/backend/internal/agent/runtime/agentctl/client_close_test.goapps/backend/internal/agent/runtime/agentctl/workspace_stream.goapps/backend/internal/agent/runtime/lifecycle/manager_interaction.goapps/backend/internal/agent/runtime/lifecycle/streams.go
|
| Filename | Overview |
|---|---|
| apps/backend/internal/agent/runtime/lifecycle/streams.go | Major rework: adds waitCh/waitChOnce to StreamManager; rewrites stopChannelContext to merge two stop channels via sync.Once-guarded goroutine tracked on sm.wg; sleepOrStop and the workspace-stream connected-select both observe waitCh so Wait() is an absolute drain barrier regardless of stopCh |
| apps/backend/internal/agent/runtime/agentctl/client.go | Adds closed flag and workspaceStream pointer; Close() now captures the stream under lock, calls low-level close helpers, then synchronously drains read/write goroutines via ws.Close() + ws.Wait() |
| apps/backend/internal/agent/runtime/agentctl/workspace_stream.go | Adds pre-dial and post-dial closed guard in StreamWorkspace, stores stream on client under lock, and extends readWorkspaceStream's defer with an identity guard for c.workspaceStream matching the existing c.workspaceStreamConn guard |
| apps/backend/internal/agent/runtime/lifecycle/manager_interaction.go | Switches RestartAgentProcess from client.Close() to CloseUpdatesStream() + CloseWorkspaceStream() so the client stays reusable; comment correctly explains why the terminal drain barrier cannot be used here |
| apps/backend/internal/agent/runtime/agentctl/client_close_test.go | New regression test validates workspace stream goroutines drain before Close() returns and that subsequent StreamWorkspace calls error; mock server correctly simulates long-lived WS handlers |
| apps/backend/Makefile | Adds test-lifecycle-goleak target with configurable LIFECYCLE_GOLEAK_COUNT for stress-reproducing the CI leak under -race |
Reviews (3): Last reviewed commit: "fix(lifecycle): drop agent-stream drain ..." | Re-trigger Greptile
There was a problem hiding this comment.
4 issues found across 7 files
Reply with feedback, questions, or to request a fix.
Re-trigger cubic
Goroutines spawned by `agentctl.Client.StreamUpdates` / `StreamWorkspace` and by `lifecycle.StreamManager.connectWorkspaceStream` could outlive the tests that created them on slow CI runners, causing intermittent `goleak.VerifyTestMain` failures in `internal/agent/runtime/lifecycle` even though every individual test passed. The leak required the test to race: `client.Close()` could return while a workspace dial was still in flight, leaving the just-spawned WS read/write loops with nobody to drain them, and `StreamManager.Wait()` only fired the drain when the external `stopCh` had been closed first. Make the close paths absolute drain barriers: - `Client.Close()` now tracks every stream goroutine it spawns (`streamWG` + a per-stream `WorkspaceStream` reference) and blocks until all of them have exited. A `closed` flag flipped under the client mutex makes subsequent `StreamUpdates` / `StreamWorkspace` calls reject the dial, so a Close that races a dial in flight cannot leave behind a stranded WS connection. `manager_interaction.go` switches the restart path to per-stream Close helpers so it can keep reusing the client after teardown. - `StreamManager.Wait()` closes a new internal `waitCh` that the retry backoff (`sleepOrStop`) and the connected `<-ws.Done() / <-stop>` select also observe. This makes `Wait` an absolute drain barrier regardless of whether the caller ever closes the external `stopCh`. `streamContext` carries both stops through `stopChannelContext` so in-flight WebSocket dials cancel on either signal. Local stress (`make test-lifecycle-goleak LIFECYCLE_GOLEAK_COUNT=20`) passes 20/20 with `-race` after the fix. The new `TestClientClose_DrainsStreamGoroutines` covers the drain barrier and the post-Close rejection path. Added a `test-lifecycle-goleak` Makefile target as the repro hook for the CI-only flake. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
… races Address PR review feedback: - stopChannelContext now registers the mergeStops goroutine on an optional *sync.WaitGroup so sm.wg.Wait stays a true drain barrier even for the connectUpdatesStream path (outer goroutine returns immediately after dial). - StreamWorkspace re-checks workspaceStreamConn after dial to seal the concurrent-callers race that could orphan a conn. - readWorkspaceStream's defer now guards the workspaceStreamConn reset by identity so an old read goroutine cannot zero out a newer conn pointer. - Clarify comments on streamWG, Client.Close, and the mock.connected wait in the close-barrier test. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
0bcdc06 to
7562635
Compare
|
Claude finished @jcfs's task in 3m 51s —— View job Claude finished reviewing — follow-up commit Todo
Follow-up Commit ReviewThe second commit (
One minor residual noteThe
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/backend/internal/agent/runtime/lifecycle/manager_interaction.go (1)
547-662: 🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy liftExtract the restart phases into helpers.
RestartAgentProcessis now 116 lines in a very stateful lifecycle path, so adding more restart behavior here pushes it further past the backend lint cap and makes the teardown/re-init flow harder to audit. Please split the stream teardown, subprocess restart, and ACP re-init phases into helpers before this grows again.As per coding guidelines,
Code-quality limits enforced by .golangci.yml: Functions ≤80 lines, ≤50 statements.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/backend/internal/agent/runtime/lifecycle/manager_interaction.go` around lines 547 - 662, Split RestartAgentProcess into three helper methods to reduce size: extract the stream teardown into a helper (e.g., closeAgentStreams(ctx, execution) that calls execution.agentctl.CloseUpdatesStream/CloseWorkspaceStream and handles prevMode capture), extract the subprocess restart into a helper (e.g., restartAgentSubprocess(ctx, execution, agentConfig) that calls execution.agentctl.Stop, builds freshCmd/freshContinueCmd via m.buildFreshAgentCommand, updates state via m.executionStore.WithLock, waits for agentctl readiness with WaitForReady, and calls m.configureAndStartAgent), and extract ACP re-init into a helper (e.g., initACPAfterRestart(ctx, execution, agentConfig) that resolves MCP servers with m.resolveMcpServers and calls m.initializeACPSessionForRestart then m.reapplySessionModeAfterReset). Keep existing logging, error handling and calls to m.updateExecutionError, and ensure unique symbols used above are invoked so behavior remains identical.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@apps/backend/internal/agent/runtime/lifecycle/manager_interaction.go`:
- Around line 547-662: Split RestartAgentProcess into three helper methods to
reduce size: extract the stream teardown into a helper (e.g.,
closeAgentStreams(ctx, execution) that calls
execution.agentctl.CloseUpdatesStream/CloseWorkspaceStream and handles prevMode
capture), extract the subprocess restart into a helper (e.g.,
restartAgentSubprocess(ctx, execution, agentConfig) that calls
execution.agentctl.Stop, builds freshCmd/freshContinueCmd via
m.buildFreshAgentCommand, updates state via m.executionStore.WithLock, waits for
agentctl readiness with WaitForReady, and calls m.configureAndStartAgent), and
extract ACP re-init into a helper (e.g., initACPAfterRestart(ctx, execution,
agentConfig) that resolves MCP servers with m.resolveMcpServers and calls
m.initializeACPSessionForRestart then m.reapplySessionModeAfterReset). Keep
existing logging, error handling and calls to m.updateExecutionError, and ensure
unique symbols used above are invoked so behavior remains identical.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 48c6006a-c99a-44dd-a85c-cec9191f0044
📒 Files selected for processing (7)
apps/backend/Makefileapps/backend/internal/agent/runtime/agentctl/agent.goapps/backend/internal/agent/runtime/agentctl/client.goapps/backend/internal/agent/runtime/agentctl/client_close_test.goapps/backend/internal/agent/runtime/agentctl/workspace_stream.goapps/backend/internal/agent/runtime/lifecycle/manager_interaction.goapps/backend/internal/agent/runtime/lifecycle/streams.go
🚧 Files skipped from review as they are similar to previous changes (5)
- apps/backend/internal/agent/runtime/agentctl/client_close_test.go
- apps/backend/internal/agent/runtime/agentctl/agent.go
- apps/backend/internal/agent/runtime/agentctl/workspace_stream.go
- apps/backend/Makefile
- apps/backend/internal/agent/runtime/lifecycle/streams.go
…ascade The previous Close drain barrier for the agent (updates) stream blocked the on_turn_complete cascade flow. completeAndStopSession calls Close synchronously on the current session's client, and Close was waiting on streamWG (which tracked the read goroutine). The wait stalled the cascade long enough that Step2's auto-started session never got its profile name into the tab — surfacing as a flaky "Profile B tab not visible" failure in workflow-agent-switch E2E shard 7. Drain barrier is still in place for the workspace stream (where the original goleak flake lived). Agent stream is closed but not awaited: the cascade legitimately stops + restarts the agent stream on the same client, and the workspace stream's existing barrier is sufficient for the leak coverage. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Claude finished @jcfs's task in 5m 9s —— View job Code ReviewTodo:
Overall AssessmentThree-commit arc tells a coherent story. Commit 1 added drain barriers for both streams + workspace stream race guards. Commit 2 addressed all round-1 review findings (mergeStops The workspace drain barrier — the original goleak source — is solid. All round-1 and round-2 review issues are resolved. Two residual items noted below. Findings1.
2. Dead code guard in
What's clean
|
| func (c stopChannelContext) Done() <-chan struct{} { | ||
| if c.stopCh == nil { | ||
| func (c *stopChannelContext) Done() <-chan struct{} { | ||
| if c.primary == nil && c.secondary == nil { |
There was a problem hiding this comment.
Dead code: this early-return is unreachable via streamContext()
streamContext() always wires sm.waitCh as secondary, and sm.waitCh is initialised unconditionally in NewStreamManager — so secondary is never nil for any context created in production or in tests that go through ConnectAll/ConnectWorkspaceStream.
The c.primary == nil && c.secondary == nil branch therefore can't fire in practice. A future reader will likely wonder whether mergeStops can be skipped or whether there's a nil-panic risk, so a short comment here would head off the confusion:
// secondary (sm.waitCh) is always non-nil, so this path is unreachable
// via streamContext(); kept as a defensive guard for external callers.
if c.primary == nil && c.secondary == nil {
return c.Context.Done()
}Alternatively, remove the guard and let the once.Do path always run — the merged channel has negligible overhead and removing dead code is cleaner. Either way, worth a note.
| ws.Close() | ||
| ws.Wait() | ||
| } | ||
|
|
There was a problem hiding this comment.
agentStreamConn identity guard still missing in readUpdatesStream's defer
This note applies to readUpdatesStream's defer in agent.go (which has no net diff in this PR and can't be commented on directly).
The readWorkspaceStream defer was fixed in commit 2 to guard both workspaceStreamConn and workspaceStream resets with identity checks (if c.workspaceStreamConn == conn). The analogous guard is still absent in readUpdatesStream's defer, which unconditionally writes c.agentStreamConn = nil.
Commit 3 intentionally removed the synchronous drain barrier for the agent stream — meaning the old goroutine can outlive CloseUpdatesStream() and the subsequent StreamUpdates dial in RestartAgentProcess. The race window is now:
CloseUpdatesStream()→ closes old conn, setsagentStreamConn = nilStreamUpdates→ new dial succeeds, setsagentStreamConn = newConn- Old goroutine exits →
deferfires:agentStreamConn = niloverwritesnewConn - Next
CloseUpdatesStream()sees nil, silently skips close → active conn orphaned
The fix is one extra if-guard in readUpdatesStream's defer in agent.go:
c.mu.Lock()
if c.agentStreamConn == conn {
c.agentStreamConn = nil
}
c.mu.Unlock()Greptile flagged this as P1 in their review. The response indicated it would be fixed but it landed only in workspace_stream.go. Low-risk follow-up, but should be tracked before the next restart-path regression.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/backend/internal/agent/runtime/agentctl/client_close_test.go (1)
79-126:⚠️ Potential issue | 🟠 MajorAdd per-package
goleak.VerifyTestMain(m)foragentctltests.
apps/backend/internal/agent/runtime/agentctl/has no package-levelTestMaincallinggoleak.VerifyTestMain(m); the onlygoleak.VerifyTestMainoccurrence is in a comment inclient_close_test.go. Add a per-packageTestMainthat invokesgoleak.VerifyTestMain(m)(suppress unavoidable third-party goroutines as needed).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/backend/internal/agent/runtime/agentctl/client_close_test.go` around lines 79 - 126, Add a package-level TestMain that calls goleak.VerifyTestMain(m) for the agentctl tests to detect goroutine leaks (e.g. alongside the existing TestClientClose_DrainsWorkspaceStream test); implement TestMain(m *testing.M) to call goleak.VerifyTestMain(m) and return os.Exit(m.Run()), and include a goleak.IgnoreTopFunction or IgnoreTopPackage entries to suppress known unavoidable third-party goroutines (adjust the ignore list as necessary) so legitimate external goroutines don't cause failures.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@apps/backend/internal/agent/runtime/agentctl/client_close_test.go`:
- Around line 79-126: Add a package-level TestMain that calls
goleak.VerifyTestMain(m) for the agentctl tests to detect goroutine leaks (e.g.
alongside the existing TestClientClose_DrainsWorkspaceStream test); implement
TestMain(m *testing.M) to call goleak.VerifyTestMain(m) and return
os.Exit(m.Run()), and include a goleak.IgnoreTopFunction or IgnoreTopPackage
entries to suppress known unavoidable third-party goroutines (adjust the ignore
list as necessary) so legitimate external goroutines don't cause failures.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 267a7a64-9f3b-4d93-9279-5f95db886efa
📒 Files selected for processing (2)
apps/backend/internal/agent/runtime/agentctl/client.goapps/backend/internal/agent/runtime/agentctl/client_close_test.go
There was a problem hiding this comment.
1 issue found across 3 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="apps/backend/internal/agent/runtime/agentctl/client.go">
<violation number="1" location="apps/backend/internal/agent/runtime/agentctl/client.go:49">
P1: Agent stream restart can clobber live connection pointer. Old read goroutine sets `agentStreamConn=nil` after new stream starts. Guard cleanup by connection identity.</violation>
</file>
Reply with feedback, questions, or to request a fix.
Re-trigger cubic
| // closed flips to true on Client.Close and prevents new StreamWorkspace | ||
| // dials from leaking goroutines past the close barrier. Agent (updates) | ||
| // stream is not gated on this flag because the cascade flow legitimately | ||
| // stops + restarts the agent stream on the same client; gating it would |
There was a problem hiding this comment.
P1: Agent stream restart can clobber live connection pointer. Old read goroutine sets agentStreamConn=nil after new stream starts. Guard cleanup by connection identity.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/backend/internal/agent/runtime/agentctl/client.go, line 49:
<comment>Agent stream restart can clobber live connection pointer. Old read goroutine sets `agentStreamConn=nil` after new stream starts. Guard cleanup by connection identity.</comment>
<file context>
@@ -43,19 +43,17 @@ type Client struct {
+ // closed flips to true on Client.Close and prevents new StreamWorkspace
+ // dials from leaking goroutines past the close barrier. Agent (updates)
+ // stream is not gated on this flag because the cascade flow legitimately
+ // stops + restarts the agent stream on the same client; gating it would
+ // strand workflow step transitions on a closed client.
closed bool
</file context>
Summary
internal/agent/runtime/lifecyclegoleak.VerifyTestMainwould fail on slow runners with leakedStreamManager.connectWorkspaceStream+WorkspaceStream.writeLoop/ read loop goroutines, even though every individual test passed (see PR feat: full GitLab integration — parity with GitHub #1120 run 26745440163 job 78819616867).agentctl.Client.Close()an absolute drain barrier — tracks every stream goroutine it spawns and waits for them, plus aclosedflag that rejects newStreamUpdates/StreamWorkspacedials so a Close racing an in-flight dial cannot strand a fresh WS connection. The restart path inmanager_interaction.goswitches to per-stream Close helpers so it can keep reusing the client after teardown.StreamManager.Wait()an absolute drain barrier — closes a new internalwaitChthat the retry backoff and the connected<-ws.Done() / <-stop>select observe, so drain doesn't depend on the caller closing the externalstopChfirst.streamContextcarries both stops throughstopChannelContextso in-flight WS dials cancel on either signal.make test-lifecycle-goleak LIFECYCLE_GOLEAK_COUNT=Ntarget (defaults to 20) as the repro hook for the flake, plus aTestClientClose_DrainsStreamGoroutinesregression test in the agentctl package.Test plan
make test-lifecycle-goleak LIFECYCLE_GOLEAK_COUNT=20— 20/20 clean under-racego test -race -count=1 ./internal/agent/runtime/lifecycle/... ./internal/agent/runtime/agentctl/...go vet ./...go build ./...Run Backend Testspasses on first attempt🤖 Generated with Claude Code